package org.zjvis.datascience.service.graph;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.nimbusds.jose.util.IOUtils;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.dto.graph.GraphDTO;
import org.zjvis.datascience.common.dto.graph.GraphInstanceDTO;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.graph.enums.GraphFileFormatEnum;
import org.zjvis.datascience.common.graph.model.Graph;
import org.zjvis.datascience.common.graph.util.GraphUtil;
import org.zjvis.datascience.common.graph.exporter.ExporterJSON;
import org.zjvis.datascience.common.graph.importer.ImporterCSV;
import org.zjvis.datascience.common.graph.importer.ImporterGML;
import org.zjvis.datascience.common.graph.model.DefaultNodeStyle;
import org.zjvis.datascience.common.vo.graph.*;
import org.zjvis.datascience.service.MinioService;

import java.io.*;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @description GraphParser
 * @date 2021-12-29
 */
public class GraphParser implements Callable<Void> {
    private final static Logger logger = LoggerFactory.getLogger(GraphParser.class);

    private final static int poolSize = 10;

    private final MinioService minioService;

    private final GraphInstanceService graphInstanceService;

    private final GraphService graphService;

    private final JanusGraphEmbedService janusGraphEmbedService;


    private GraphFileFormatEnum fileType;
    private Long graphInstanceId;
    private Long graphId;
    private String fileName;
    private String bucketPath;
    private boolean isAppend;
    private Map<String, Map<String, Integer>> cIdAttrHelper;
    private Map<String, Map<String, Integer>> eIdAttrHelper;
    private Map<String, CategoryVO> cIdMap;
    private Map<String, EdgeVO> eIdMap;
    private Map<String, NodeVO> nIdMap;
    private Map<String, LinkVO> lIdMap;
    private Map<String, List<String>> cId2nIdMap;
    private Map<String, List<String>> eId2lIdMap;
    private Map<String, Pair<String, String>> edgeHelper;

    private DefaultNodeStyle defaultStyle;
    private int colorPtr;
    private Map<String ,String> colorHelper;
    private Graph graph;

    private GraphInstanceDTO graphInstance;
    public GraphParser(GraphFileFormatEnum fileType,
                       MinioService minioService,
                       GraphInstanceService graphInstanceService,
                       GraphService graphService,
                       JanusGraphEmbedService janusGraphEmbedService,
                       GraphInstanceDTO instance,
                       String bucketPath,
                       String fileName) {
        this.minioService = minioService;
        this.graphInstanceService = graphInstanceService;
        this.graphService = graphService;
        this.janusGraphEmbedService = janusGraphEmbedService;
        this.fileType = fileType;
        this.graphInstance = instance;
        this.graphInstanceId = instance.getId();
        this.graphId = instance.getGraphId();
        this.bucketPath = bucketPath;
        this.fileName = fileName;
    }

    @Override
    public Void call() throws Exception {
        GraphDTO graph = graphService.queryById(graphId);
        JSONObject obj = JSONObject.parseObject(graph.getDataJson());
        this.init(obj);
        this.parse();
        return null;
    }

    public void init(JSONObject obj) {
        List<CategoryVO> categories = obj.getJSONArray("categories").toJavaList(CategoryVO.class);
        List<EdgeVO> edges = obj.getJSONArray("edges").toJavaList(EdgeVO.class);
        List<NodeVO> nodes = obj.getJSONArray("nodes").toJavaList(NodeVO.class);
        List<LinkVO> links = obj.getJSONArray("links").toJavaList(LinkVO.class);
        this.cIdAttrHelper = new HashMap<>();
        this.eIdAttrHelper = new HashMap<>();
        this.cIdMap = new HashMap<>();
        this.eIdMap = new HashMap<>();
        this.nIdMap = new HashMap<>();
        this.lIdMap = new HashMap<>();
        this.cId2nIdMap = new HashMap<>();
        this.eId2lIdMap = new HashMap<>();
        this.edgeHelper = new HashMap<>();
        this.defaultStyle = new DefaultNodeStyle();
        this.colorPtr = 0;
        this.colorHelper = new HashMap<>();
        this.isAppend = false;
        if (categories.size() == 0 && edges.size() == 0 && nodes.size() == 0 && links.size() == 0) {
            return;
        }
        isAppend = true;
        graph = new Graph(nodes, links);
        JSONObject colorHelperObj = obj.getJSONObject("colorHelper");
        categories.forEach(category -> {
                    cIdMap.put(category.getId(), category);
                    cIdAttrHelper.put(category.getId(), new HashMap<>());
                    cId2nIdMap.put(category.getId(), new ArrayList<>());
//                    if (category.getKeyAttr() != null) {
//                        cIdAttrHelper.get(category.getId()).put(category.getKeyAttr().getName(), category.getKeyAttr().getType());
//                    }
                    for (CategoryAttrVO attr: category.getAttrs()) {
                        cIdAttrHelper.get(category.getId()).put(attr.getName(), attr.getType());
                    }
                    colorHelper.put(category.getId(), colorHelperObj.getString(category.getId()));
        });
        edges.forEach(edge -> {
            eIdMap.put(edge.getId(), edge);
            eIdAttrHelper.put(edge.getId(), new HashMap<>());
            eId2lIdMap.put(edge.getId(), new ArrayList<>());
            for (CategoryAttrVO attr: edge.getAttrs()) {
                eIdAttrHelper.get(edge.getId()).put(attr.getName(), attr.getType());
            }
            edgeHelper.put(edge.getId(), new Pair<>(edge.getSource(), edge.getTarget()));
        });
        nodes.forEach(node -> {
            nIdMap.put(node.getId(), node);
            cId2nIdMap.get(node.getCategoryId()).add(node.getId());
        });
        links.forEach(link -> {
            lIdMap.put(link.getId(), link);
            eId2lIdMap.get(link.getEdgeId()).add(link.getId());
        });
        colorPtr = colorHelper.size();
    }

    public void parse() {
        graphInstanceService.setStatusRunning(graphInstance);
        InputStream in = null;
        long t1 = System.currentTimeMillis();
        try {
//            File file = new File("E:\\aiworks\\datascience-web\\src\\test\\resources\\n67e184.json");
//            InputStream in=new FileInputStream(file);
            if (bucketPath != null && fileName != null) {
                in = minioService.getObject(bucketPath, fileName);

            }
        } catch (Exception e) {
            graphInstanceService.setStatusFail(graphInstance, graphId.toString(), e.toString());
            return;
        }
        try {
            if (fileType.getVal() == GraphFileFormatEnum.JSON.getVal()) {
                this.parseJSON(in);
            } else if (fileType.getVal() == GraphFileFormatEnum.CSV.getVal()) {
                this.parseCSV(in);
            } else if (fileType.getVal() == GraphFileFormatEnum.GML.getVal()) {
                this.parseGML(in);
            } else if (fileType.getVal() == GraphFileFormatEnum.GRAPH_BUILD.getVal()) {
                String content = this.loadFromBuild();
                this.parseJSON(content);
            }
        } catch (DataScienceException e) {
            graphInstanceService.setStatusFail(graphInstance, graphId.toString(), e.getApiResult().getMessage());
            return;
        } catch (Exception e) {
            graphInstanceService.setStatusFail(graphInstance, graphId.toString(), e.toString());
            return;
        }
        generateCategory();
        generateEdge();
        repairAttrs();
        GraphDTO graph = graphService.queryById(graphId);
        JSONObject obj = JSON.parseObject(graph.getDataJson());
        obj.put("categories", this.getCategories());
        obj.put("edges", this.getEdges());
        obj.put("nodes", this.getNodes());
        obj.put("links", this.getLinks());
        obj.put("colorHelper", this.colorHelper);
        JSONObject graphInfo = obj.getJSONObject("graphInfo");
        graphInfo.put("numOfNodes", nIdMap.keySet().size());
        graphInfo.put("numOfLinks", lIdMap.keySet().size());
        graphInfo.put("directed", this.getLinks().stream().anyMatch(LinkVO::getDirected));
        obj.put("graphInfo", graphInfo);

        Map<String, String> idMap = this.write2Janus();
        obj.put("idMap", idMap);

        graph.setDataJson(obj.toJSONString());
        graphService.update(graph);

        long t2 = System.currentTimeMillis();
        Long duringTime = t2 - t1;
        graphInstanceService.setStatusSuccess(graphInstance, duringTime, graphId.toString());

    }

    private void parseJSON(InputStream in) throws IOException, DataScienceException {
        String JSONString = IOUtils.readInputStreamToString(in, StandardCharsets.UTF_8);
        parseJSON(JSONString);
    }

    private void parseJSON(String JSONString) throws IOException, DataScienceException {
        JSONObject obj = JSONArray.parseObject(JSONString);
        JSONArray nodeArray = obj.getJSONArray("nodes");
        JSONArray linkArray = obj.getJSONArray("links");
        if (nodeArray != null) {
            nodeArray.forEach(this::parseLoadNode);
        }
        if (linkArray != null) {
            linkArray.forEach(this::parseLoadLink);
        }

//        doWithExecutor(excutor -> {
//            forEach(nodeArray, o -> {
//                excutor.execute(() -> {
//                    parseLoadNode(o);
//                });
//            });
//        });
//
//        doWithExecutor(excutor -> {
//            forEach(linkArray, o -> {
//                excutor.execute(() -> {
//                    parseLoadLink(o);
//                });
//            });
//        });

        logger.info("success...: " + nIdMap.size() + " " + lIdMap.size());
    }

    private void parseCSV(InputStream in) throws IOException, DataScienceException {
        JSONObject ret = ImporterCSV.parseCSV2JSON(in, null);
        JSONArray headArray = ret.getJSONArray("head");
        List<String> heads = new ArrayList<>();
        for (int i = 0; i < headArray.size() ; i++) {
            JSONObject head = headArray.getJSONObject(i);
            heads.add(head.getString("name"));
        }
        String loadType = checkHeads(heads);
        JSONArray data = ret.getJSONArray("data");
        if (loadType.equals("node")) {
            data.forEach(this::parseLoadNode);
        } else if (loadType.equals("link")) {
            data.forEach(this::parseLoadLink);
        }
    }

    private void parseGML(InputStream in) throws IOException {
        ImporterGML importerGML = new ImporterGML();
        importerGML.execute(in);
        JSONArray nodeArray = importerGML.getNodes();
        JSONArray linkArray = importerGML.getLinks();
        if (nodeArray != null) {
            nodeArray.forEach(this::parseLoadNode);
        }
        if (linkArray != null) {
            linkArray.forEach(this::parseLoadLink);
        }
    }

    private String loadFromBuild() {
        Long srcGid = JSONObject.parseObject(graphInstance.getDataJson()).getLong("graph_build_source");
        GraphDTO graph = graphService.queryById(srcGid);
        JSONObject obj = JSONObject.parseObject(graph.getDataJson());
        List<CategoryVO> categories = obj.getJSONArray("categories").toJavaList(CategoryVO.class);
        List<NodeVO> nodes = obj.getJSONArray("nodes").toJavaList(NodeVO.class);
        List<LinkVO> links = obj.getJSONArray("links").toJavaList(LinkVO.class);
        Map<String, String> labelMap = categories.stream().collect(Collectors.toMap(CategoryVO::getId, CategoryVO::getLabel));
        JSONObject multiValueHelper = obj.getJSONObject("multiValueHelper");
        ExporterJSON exporterJSON = new ExporterJSON();
        exporterJSON.setGraphId(srcGid);
        exporterJSON.setNodes(nodes);
        exporterJSON.setLinks(links);
        exporterJSON.setMultiValueHelper(multiValueHelper);
        exporterJSON.setLabelMap(labelMap);
        exporterJSON.execute();

        return exporterJSON.getJSONString();
    }

    private void parseLoadNode(Object o) throws DataScienceException {
        JSONObject nodeObj = o instanceof JSONObject ? (JSONObject)o : (JSONObject)JSONObject.toJSON(o);
        NodeVO node = new NodeVO();
        Integer id = checkInteger(nodeObj, "id");
        String label = nodeObj.getString("label");
        String cid = nodeObj.getString("categoryId");
        if (id == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "id");
        }
        if (cid == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "categoryId");
        }
        List<AttrVO> attrs = new ArrayList<>();
        JSONObject attrsObj = checkJSONObject(nodeObj, "attributes");
        if (cIdAttrHelper.putIfAbsent(cid, new HashMap<>()) == null) {
            colorHelper.put(cid, defaultStyle.getColorLoop().get(colorPtr++ % defaultStyle.getColorLoopSize()));
        }
        if (attrsObj != null) {
            for (String key: attrsObj.keySet()) {
                Object val = attrsObj.get(key);
                if (val instanceof BigDecimal) {
                    val = ((BigDecimal) val).doubleValue();
                }
                int type = val instanceof String ? 12 : 2;
                attrs.add(
                        AttrVO.builder()
                                .key(key)
                                .value(val)
                                .type(type)
                                .build()
                );
                Map<String, Integer> cMap = cIdAttrHelper.get(cid);
                Object preType = cMap.putIfAbsent(key, type);
                if (preType != null && !preType.equals(type)) {
                    throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_ATTRS_TYPE_CONFLICT, key);
                }
            }
        }
        node.setId(GraphUtil.aliasNodeId(id.toString(), graphId));
        node.setLabel(label != null ? label :"");
        node.setCategoryId(cid);
        node.setAttrs(attrs);
        node.setOrderId(id);
        JSONObject config = checkJSONObject(nodeObj, "config");
        if (config != null) {
            JSONObject style = config.getJSONObject("style");
            if (style == null) {
                style = new JSONObject();
                style.put("fill", colorHelper.get(cid));
                style.put("stroke", defaultStyle.getStyle().getString("stroke"));
            } else if (style.get("fill") == null) {
                style.put("fill", colorHelper.get(cid));
            }
            node.setStyle(style);

            JSONObject labelCfg = config.getJSONObject("labelCfg");
            node.setLabelCfg(labelCfg == null ? JSONObject.parseObject(defaultStyle.getLabelCfg()) : labelCfg);
            Integer size = config.getInteger("size");
            node.setSize(size == null ? defaultStyle.getSize() : size);
            node.setType(config.getString("type") == null ? defaultStyle.getDefaultType() : config.getString("type"));
            node.setX(config.getInteger("x"));
            node.setY(config.getInteger("y"));
        } else {
            JSONObject style = new JSONObject();
            style.put("fill", colorHelper.get(cid));
            style.put("stroke", defaultStyle.getStyle().getString("stroke"));
            node.setStyle(style);
            node.setLabelCfg(JSONObject.parseObject(defaultStyle.getLabelCfg()));
            node.setSize(defaultStyle.getSize());

        }
        NodeVO previous = nIdMap.put(node.getId(), node);
        if (previous != null) {
            cId2nIdMap.get(previous.getCategoryId()).remove(previous.getId());
            if (!previous.getCategoryId().equals(cid)) {
                graph.getOutLinks(node.getId()).forEach(l -> {
                    LinkVO link = lIdMap.get(l.getId());
                    JSONObject linkObj = link.toJSONObject();
                    linkObj.put("source", node.getOrderId());
                    linkObj.put("target", nIdMap.get(link.getTarget()).getOrderId());
                    parseLoadLink(linkObj);
                });
                graph.getInLinks(node.getId()).forEach(l -> {
                    LinkVO link = lIdMap.get(l.getId());
                    JSONObject linkObj = link.toJSONObject();
                    linkObj.put("source", nIdMap.get(link.getSource()).getOrderId());
                    linkObj.put("target", node.getOrderId());
                    parseLoadLink(linkObj);
                });
            }
        }
        cId2nIdMap.putIfAbsent(cid, new ArrayList<>());
        cId2nIdMap.get(cid).add(node.getId());
    }

    private void parseLoadLink(Object o) throws DataScienceException {
        JSONObject linkObj = o instanceof JSONObject ? (JSONObject)o : (JSONObject)JSONObject.toJSON(o);
        LinkVO link = new LinkVO();
        Integer id = checkInteger(linkObj, "id");
        String label = linkObj.getString("label");
        Integer source = checkInteger(linkObj, "source");
        Integer target = checkInteger(linkObj, "target");
        if (id == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "id");
        }
        if (source == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "source");
        }
        if (target == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "target");
        }
        link.setSource(GraphUtil.aliasNodeId(source.toString(), graphId));
        link.setTarget(GraphUtil.aliasNodeId(target.toString(), graphId));
        NodeVO srcNode = nIdMap.get(link.getSource());
        NodeVO tarNode = nIdMap.get(link.getTarget());
        if (srcNode == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_NODE_NOT_FOUND, "source: " + source);
        }
        if (tarNode == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_NODE_NOT_FOUND, "target: " + target);
        }
        String srcCid = srcNode.getCategoryId();
        String tarCid = tarNode.getCategoryId();
        Pair<String, String> pair = new Pair<>(srcCid, tarCid);
        String eid = srcCid + "_" + tarCid;
        edgeHelper.putIfAbsent(eid, pair);
        List<AttrVO> attrs = new ArrayList<>();
        JSONObject attrsObj = checkJSONObject(linkObj, "attributes");
        eIdAttrHelper.putIfAbsent(eid, new HashMap<>());
        if (attrsObj != null) {
            for (String key: attrsObj.keySet()) {
                Object val = attrsObj.get(key);
                int type = val instanceof String ? 12 : 2;
                attrs.add(
                        AttrVO.builder()
                                .key(key)
                                .value(val)
                                .type(type)
                                .build()
                );
                Map<String, Integer> eMap = eIdAttrHelper.get(eid);
                Object preType = eMap.putIfAbsent(key, type);
                if (preType != null && !preType.equals(type)) {
                    throw new DataScienceException(BaseErrorCode.GRAPH_LOAD_ATTRS_TYPE_CONFLICT);
                }
            }
        }
        link.setId(GraphUtil.aliasLinkId(id.toString(), graphId));
        link.setLabel(label != null ? label :"");
        link.setEdgeId(eid);
        link.setDirected(linkObj.getBoolean("directed") != null ? linkObj.getBoolean("directed") : false);
        link.setWeight(linkObj.getDouble("weight") != null ? linkObj.getDouble("weight") : 1.0);
        link.setAttrs(attrs);

        JSONObject config = checkJSONObject(linkObj, "config");
        if (config != null) {
            link.setStyle(config.getJSONObject("style") == null ? new JSONObject() : config.getJSONObject("style"));
            link.setLabelCfg(config.getJSONObject("labelCfg") == null ? new JSONObject() : config.getJSONObject("labelCfg"));
            link.setType(config.getString("type") == null ? "line" : config.getString("type"));
            link.setCurveOffset(config.getFloat("curveOffset"));
            link.setControlPoints(config.getJSONArray("controlPoints"));
        }
        link.setOrderId(id);
        LinkVO previous = lIdMap.put(link.getId(), link);
        if (previous != null) {
            eId2lIdMap.get(previous.getEdgeId()).remove(previous.getId());
        }
        eId2lIdMap.putIfAbsent(eid, new ArrayList<>());
        eId2lIdMap.get(eid).add(link.getId());
    }

    public void generateCategory() {
        for (String cid: new ArrayList<>(cIdAttrHelper.keySet())) {
            if (cId2nIdMap.get(cid).isEmpty()) {
                cIdAttrHelper.remove(cid);
                cIdMap.remove(cid);
                cId2nIdMap.remove(cid);
                continue;
            }
            CategoryVO category = new CategoryVO();
            category.setId(cid);
            String labelName = cid;
            category.setLabel(labelName);
            category.setTableInfo(new JSONObject());
            List<CategoryAttrVO> attrs = new ArrayList<>();
            Map<String, Integer> attrMap = cIdAttrHelper.get(cid);
            for (String key: attrMap.keySet()) {
                attrs.add(
                        CategoryAttrVO.builder()
                                .column(key)
                                .name(key)
                                .type(attrMap.get(key))
                                .build()
                );
            }
            category.setAttrs(attrs);
            //生成一个标签位
            CategoryAttrVO attr = CategoryAttrVO.builder().name(labelName).column(labelName).type(12).build();
            category.setOriginLabel(attr);
            cIdMap.put(cid, category);
        }
    }

    public void generateEdge() {
        for (String eid: new ArrayList<>(eIdAttrHelper.keySet())) {
            if (eId2lIdMap.get(eid).isEmpty()) {
                eIdAttrHelper.remove(eid);
                eIdMap.remove(eid);
                eId2lIdMap.remove(eid);
                continue;
            }
            EdgeVO edge = new EdgeVO();
            edge.setId(eid);
            edge.setLabel(eid);
            edge.setWeight("null");
            Pair<String, String> pair = edgeHelper.get(eid);
            String srcCid = pair.getKey();
            String tarCid = pair.getValue();
            edge.setSource(srcCid);
            edge.setTarget(tarCid);
            edge.setType(srcCid.equals(tarCid) ? "loop" : "line");
            List<CategoryAttrVO> attrs = new ArrayList<>();
            Map<String, Integer> attrMap = eIdAttrHelper.get(eid);
            for (String key: attrMap.keySet()) {
                attrs.add(
                        CategoryAttrVO.builder()
                                .column(key)
                                .name(key)
                                .type(attrMap.get(key))
                                .build()
                );
            }
            edge.setAttrs(attrs);
            eIdMap.put(eid, edge);
        }
    }

    public void repairAttrs() {
        //为字段缺失的属性添加空值
        if (cIdAttrHelper.values().stream().mapToLong(subMap -> subMap.keySet().size()).sum() > 0) {
            nIdMap.values().stream().forEach(node -> {
                List<String> cAttrs = cIdMap.get(node.getCategoryId()).getAttrs().stream().map(CategoryAttrVO::getName).collect(Collectors.toList());
                List<String> attrs = node.getAttrs().stream().map(AttrVO::getKey).collect(Collectors.toList());
                cAttrs.removeAll(attrs);
                for (String repairKey: cAttrs) {
                    node.getAttrs().add(AttrVO.builder().key(repairKey).value(null).type(cIdAttrHelper.get(node.getCategoryId()).get(repairKey)).build());
                }
            });
        }

        if (eIdAttrHelper.values().stream().mapToLong(subMap -> subMap.keySet().size()).sum() > 0) {
            lIdMap.values().stream().forEach(link -> {
                List<String> eAttrs = eIdMap.get(link.getEdgeId()).getAttrs().stream().map(CategoryAttrVO::getName).collect(Collectors.toList());
                List<String> attrs = link.getAttrs().stream().map(AttrVO::getKey).collect(Collectors.toList());
                eAttrs.removeAll(attrs);
                for (String repairKey: eAttrs) {
                    link.getAttrs().add(AttrVO.builder().key(repairKey).value(null).type(eIdAttrHelper.get(link.getEdgeId()).get(repairKey)).build());
                }
            });
        }
    }

    public Map<String, String> write2Janus() {
        return janusGraphEmbedService.createAllOneTime(graphId, getCategories(), getEdges(), cIdMap, eIdMap, getNodes(), getLinks(), false);
    }


    public List<CategoryVO> getCategories() {
        return new ArrayList<>(cIdMap.values());
    }

    public List<EdgeVO> getEdges() {
        return new ArrayList<>(eIdMap.values());
    }

    public List<NodeVO> getNodes() {
        return new ArrayList<>(nIdMap.values());
    }

    public List<LinkVO> getLinks() {
        return new ArrayList<>(lIdMap.values());
    }

    public Long getGraphInstanceId() {
        return graphInstanceId;
    }

    private <T> void forEach(Iterable<T> handlers, IOConsumer<T> consumer) throws IOException {
        for (T vfe : handlers) {
            consumer.accept(vfe);
        }
    }

    void doWithExecutor(IOConsumer<ExecutorService> consumer) throws IOException {
        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
        consumer.accept(executor);
        logger.info("Awaiting termination of jobs");
        awaitTerminationAfterShutdown(executor);
    }

    @FunctionalInterface
    public interface IOConsumer<T> {

        /**
         * Performs this operation on the given argument.
         *
         * @param t the input argument
         */
        void accept(T t) throws IOException;
    }

    public static void awaitTerminationAfterShutdown(ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(48, TimeUnit.HOURS)) {
                threadPool.shutdownNow();
            }
        } catch (InterruptedException ex) {
            threadPool.shutdownNow();
            // We've been asked to shut down, maybe Ctrl+C
            try {
                threadPool.awaitTermination(120, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
            Thread.currentThread().interrupt();
        }
    }

    public static String checkHeads(List<String> heads) {
        List<String> validNodeHead = Arrays.asList("id", "categoryId", "label", "attributes", "config");
        List<String> validLinkHead = Arrays.asList("id", "source", "target", "directed", "label", "weight", "attributes", "config");

        String loadType;
        if (!heads.contains("id")) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "id");
        } else {
            if (heads.contains("categoryId")) {
                //check node file
                for (String head: heads) {
                    if (!validNodeHead.contains(head)) throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_CSV_HEAD_ERROR, head);
                }
                loadType = "node";
            }
            else if (heads.contains("source") && heads.contains("target")) {
                //check link file
                for (String head: heads) {
                    if (!validLinkHead.contains(head)) throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_CSV_HEAD_ERROR, head);
                }
                loadType = "link";
            }
            else throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_CSV_HEAD_ERROR, "缺少必填字段");
        }
        return loadType;
    }

    public static Integer checkInteger(JSONObject obj, String key) {
        Integer id;
        try {
            id = obj.getInteger(key);
        } catch (NumberFormatException e) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_INTEGER_ID, key + ": " + obj.get(key));
        }
        return id;
    }

    public static JSONObject checkJSONObject(JSONObject obj, String key) {
        try {
            JSONObject attrsObj = obj.getJSONObject(key);
            return attrsObj;
        } catch (Exception e1) {
            String str = obj.getString(key);
            if (str.startsWith("\"{") && str.endsWith("}\"")) {
                str = str.substring(1, str.length()-1);
                str = str.replaceAll("\"\"","\"");
                try {
                    JSONObject attrsObj = JSONObject.parseObject(str);
                    return attrsObj;
                } catch (Exception e2) {
                    throw e2;
                }
            }
        }
        return null;
    }

    public static boolean checkNode(Object o, Map<String, Map<String, Integer>> cIdAttrHelper, Map<Integer, String> nId2Cid) throws DataScienceException {
        JSONObject nodeObj = o instanceof JSONObject ? (JSONObject)o : (JSONObject)JSONObject.toJSON(o);
        Integer id = checkInteger(nodeObj, "id");
        String cid = nodeObj.getString("categoryId");
        if (id == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "id");
        }
        if (cid == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "categoryId");
        }
        List<AttrVO> attrs = new ArrayList<>();
        JSONObject attrsObj = checkJSONObject(nodeObj, "attributes");
        cIdAttrHelper.putIfAbsent(cid, new HashMap<>());
        if (attrsObj != null) {
            for (String key: attrsObj.keySet()) {
                Object val = attrsObj.get(key);
                if (val instanceof BigDecimal) {
                    val = ((BigDecimal) val).doubleValue();
                }
                int type = val instanceof String ? 12 : 2;
                attrs.add(
                        AttrVO.builder()
                                .key(key)
                                .value(val)
                                .type(type)
                                .build()
                );
                Map<String, Integer> cMap = cIdAttrHelper.get(cid);
                Object preType = cMap.putIfAbsent(key, type);
                if (preType != null && !preType.equals(type)) {
                    throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_ATTRS_TYPE_CONFLICT, key);
                }
            }
        }
        nId2Cid.put(id, cid);
        return true;
    }

    public static boolean checkLink(Object o, Map<String, Map<String, Integer>> eIdAttrHelper, Map<Integer, String> nId2Cid) throws DataScienceException {
        JSONObject linkObj = o instanceof JSONObject ? (JSONObject)o : (JSONObject)JSONObject.toJSON(o);
        Integer id = checkInteger(linkObj, "id");
        Integer source = checkInteger(linkObj, "source");
        Integer target = checkInteger(linkObj, "target");
        if (id == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "id");
        }
        if (source == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "source");
        }
        if (target == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_REQUIRED_KEY_MISS, "target");
        }

        String srcCid = nId2Cid.get(source);
        String tarCid = nId2Cid.get(target);

        if (srcCid == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_NODE_NOT_FOUND, "source: " + source);
        }
        if (tarCid == null) {
            throw DataScienceException.of(BaseErrorCode.GRAPH_LOAD_NODE_NOT_FOUND, "target: " + target);
        }

        String eid = srcCid + "_" + tarCid;
        List<AttrVO> attrs = new ArrayList<>();
        JSONObject attrsObj = checkJSONObject(linkObj, "attributes");
        eIdAttrHelper.putIfAbsent(eid, new HashMap<>());
        if (attrsObj != null) {
            for (String key: attrsObj.keySet()) {
                Object val = attrsObj.get(key);
                int type = val instanceof String ? 12 : 2;
                attrs.add(
                        AttrVO.builder()
                                .key(key)
                                .value(val)
                                .type(type)
                                .build()
                );
                Map<String, Integer> eMap = eIdAttrHelper.get(eid);
                Object preType = eMap.putIfAbsent(key, type);
                if (preType != null && !preType.equals(type)) {
                    throw new DataScienceException(BaseErrorCode.GRAPH_LOAD_ATTRS_TYPE_CONFLICT);
                }
            }
        }
        return true;
    }
}
